EchoServer 案例
服务端的实现
NettyEchoServer:功能极其简单,服务端读取客户端输入的数据,然后将数据直接回显到控制台。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
| import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets;
public class NettyEchoServer { private final int port; ServerBootstrap b = new ServerBootstrap();
public NettyEchoServer(int port) { this.port = port; }
public void runServer() { EventLoopGroup bossLoopGroup = new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory()); EventLoopGroup workerLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); try { b.group(bossLoopGroup, workerLoopGroup);
b.channel(NioServerSocketChannel.class);
b.localAddress(new InetSocketAddress(port));
b.option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true);
b.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(NettyEchoServerHandler.INSTANCE); } });
ChannelFuture f = b.bind().sync(); System.out.println("Echo 服务器启动成功,监听端口:" + port);
f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossLoopGroup.shutdownGracefully(); workerLoopGroup.shutdownGracefully(); } }
@ChannelHandler.Sharable static class NettyEchoServerHandler extends ChannelInboundHandlerAdapter { public static final NettyEchoServerHandler INSTANCE = new NettyEchoServerHandler();
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg;
System.out.println("msg type: " + (in.hasArray() ? "堆内存" : "直接内存"));
int len = in.readableBytes(); byte[] arr = new byte[len]; in.getBytes(0, arr);
System.out.println("server received: " + new String(arr, StandardCharsets.UTF_8)); System.out.println("写回前,msg.refCnt:" + in.refCnt());
ChannelFuture f = ctx.writeAndFlush(msg);
f.addListener((ChannelFuture future) -> { System.out.println("写回任务状态:" + (future.isSuccess() ? "成功\n" : "失败\n")); }); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
public static void main(String[] args) { new NettyEchoServer(9000).runServer(); } }
|
客户端的实现
NettyEchoClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.nio.charset.StandardCharsets; import java.util.Scanner;
public class NettyEchoClient { private final int serverPort; private final String serverIp; Bootstrap b = new Bootstrap();
public NettyEchoClient(String ip, int port) { this.serverPort = port; this.serverIp = ip; }
public void runClient() { EventLoopGroup workerLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); try { b.group(workerLoopGroup); b.channel(NioSocketChannel.class); b.remoteAddress(serverIp, serverPort); b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); b.handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE); } });
ChannelFuture future = b.connect(); future.addListener((ChannelFuture futureListener) -> { if (futureListener.isSuccess()) { System.out.println("EchoClient客户端连接成功!"); } else { System.out.println("EchoClient客户端连接失败!"); } }); future.sync();
Channel channel = future.channel(); Scanner scanner = new Scanner(System.in); System.out.println("请输入发送内容:"); while (scanner.hasNext()) { String next = scanner.next(); byte[] bytes = next.getBytes(StandardCharsets.UTF_8); ByteBuf buffer = channel.alloc().buffer(); buffer.writeBytes(bytes); channel.writeAndFlush(buffer); System.out.println("请输入发送内容:"); } } catch (Exception e) { e.printStackTrace(); } finally { workerLoopGroup.shutdownGracefully(); } }
@ChannelHandler.Sharable static class NettyEchoClientHandler extends ChannelInboundHandlerAdapter { public static final NettyEchoClientHandler INSTANCE = new NettyEchoClientHandler();
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg; int len = byteBuf.readableBytes(); byte[] arr = new byte[len]; byteBuf.getBytes(0, arr); System.out.println("client received: " + new String(arr, StandardCharsets.UTF_8)); byteBuf.release(); } }
public static void main(String[] args) { NettyEchoClient nettyEchoClient = new NettyEchoClient("127.0.0.1", 9000); nettyEchoClient.runClient(); } }
|
半包问题的复现
问题演示
改造一下前面的 NettyEchoClient 实例,通过循环的方式向 NettyEchoServer 回显服务器写入大量的 ByteBuf,然后看看实际的服务器响应结果。注意:服务器类不需要改造,直接使用之前的回显服务器即可。改造好的客户端类——叫 NettyDumpSendClient。在客户端建立连接成功之后,使用一个 for 循环不断通过通道向服务端发送ByteBuf, 一直写到1000次,这些ByteBuf的内容相同,都是相同的字符串内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| public class NettyDumpSendClient { private final String serverIp; private final int serverPort; Bootstrap b = new Bootstrap();
public NettyDumpSendClient(String ip, int port) { this.serverPort = port; this.serverIp = ip; }
public void runClient() { EventLoopGroup workerLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); try { b.group(workerLoopGroup); b.channel(NioSocketChannel.class); b.remoteAddress(serverIp, serverPort); b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); b.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE); } });
ChannelFuture future = b.connect(); future.addListener((ChannelFuture futureListener) -> { if (futureListener.isSuccess()) { System.out.println("NettyDumpSendClient客户端连接成功!"); } else { System.out.println("NettyDumpSendClient客户端连接失败!"); } }); future.sync();
Channel channel = future.channel(); String content = "密涅瓦的猫头鹰在黄昏起飞。"; byte[] bytes = content.getBytes(StandardCharsets.UTF_8); for (int i = 0; i < 1000; i++) { ByteBuf buffer = channel.alloc().buffer(); buffer.writeBytes(bytes); channel.writeAndFlush(buffer); } } catch (Exception e) { e.printStackTrace(); } finally { workerLoopGroup.shutdownGracefully(); } }
@ChannelHandler.Sharable static class NettyEchoClientHandler extends ChannelInboundHandlerAdapter { public static final NettyEchoClient.NettyEchoClientHandler INSTANCE = new NettyEchoClient.NettyEchoClientHandler();
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg; int len = byteBuf.readableBytes(); byte[] arr = new byte[len]; byteBuf.getBytes(0, arr); System.out.println("client received: " + new String(arr, StandardCharsets.UTF_8)); byteBuf.release(); } }
public static void main(String[] args) { NettyDumpSendClient client = new NettyDumpSendClient("127.0.0.1", 9000); client.runClient(); } }
|


仔细观察服务端的控制台输出,可以看出存在三种类型的输出:
- 读到一个完整的客户端输入ByteBuf。
- 读到多个客户端的ByteBuf输入,但是“粘”在了一起。
- 读到部分ByteBuf的内容,并且有乱码。
除了观察服务端的输出之外,再仔细观察客户端的输出,可以看 到客户端也存在以上三种类型的输出。对应于第1种情况接收到的完整的ByteBuf,这里称为“全包”。 对应于第2种情况,多个发送端的输入ByteBuf“粘”在了一起,这里 称为“粘包”。对应于第3种情况,一个发送过来的ByteBuf被“拆 开”接收,接收端读取到一个破碎的包,这里称为“半包”。为了简单起见,也可以将“粘包”的情况看成特殊的“半包”。 “粘包”和“半包”可以统称为传输的“半包问题”。
半包问题的本质
半包问题包含了 “粘包” 和 “半包” 两种情况:
- 粘包:接收端(Receiver)收到一个ByteBuf,包含了发送端(Sender)的多个ByteBuf,发送端的多个ByteBuf在接收端“粘” 在了一起。
- 半包:Receiver将Sender的一个ByteBuf“拆”开了收,收 到多个破碎的包。换句话说,Receiver收到了Sender的一个ByteBuf的 一小部分。
无论是粘包还是半包都不是一次正常的ByteBuf缓存区接收,具体如图所示:

粘包和半包的来源得从操作系统底层说起。我们知道,底层网络是以二进制字节报文的形式来传输数据的。读数据的过程大致为:当IO可读时,Netty 会从底层网络将二进制数据读到ByteBuf缓冲区中,再交给Netty程序转成Java POJO对象。写数据的过程大致为:编码器将一个Java类型的数据转换成底层能够传输的二进制ByteBuf缓冲数据。
在发送端 Netty 的应用层进程缓冲区中,程序以 ByteBuf 为单位来发送数据,但是到了底层操作系统内核缓冲区,底层会按照协议的规范对数据包进行二次封装,封装成传输层的协议报文,再进行发送。 在接收端收到传输层的二进制包后,首先复制到内核缓冲区,Netty读取ByteBuf时才复制到应用的用户缓冲区。在接收端,当Netty程序将数据从内核缓冲区复制到用户缓冲区的 ByteBuf时,问题来了:
- 每次读取底层缓冲的数据容量是有限制的,当TCP内核缓冲区的数据包比较大时,可能会将一个底层包分成多次ByteBuf进行复制,进而造成用户缓冲区读到的是半包。
- 当TCP内核缓冲区的数据包比较小时,一次复制的是不止一个内核缓冲区包,进而会造成用户缓冲区读到粘包。
如何解决呢?基本思路是,在接收端,Netty程序需要根据自定义协议将读取到的进程缓冲区ByteBuf在应用层进行二次组装,重新组装应用层的数据包。接收端的这个过程通常也称为分包或者拆包。在Netty中分包的方法主要有以下两种:
- 可以自定义解码器分包器:基于 ByteToMessageDecoder 或者 ReplayingDecoder,定义自己的用户缓冲区分包器。
- 使用 Netty 内置的解码器。例如,可以使用 Netty 内置的 LengthFieldBasedFrameDecoder 自定义长度数据包解码器对用户缓冲区 ByteBuf 进行正确的分包。
标题:
Java NIO - Netty 的一些简单案例及半包问题的演示